QuickOPC User's Guide and Reference
OPC Reactive Extensions (Rx/OPC)
Development Models > Reactive Programming Model > OPC Reactive Extensions (Rx/OPC)
In This Topic

Introduction

OPC Reactive Extensions (Rx/OPC) are OPC-specific classes that implement the Rx interfaces. With Rx/OPC, you can e.g.:

For OPC Data Access, OPC XML-DA and OPC-UA data, Rx/OPC gives you the ability to work with flows representing OPC (monitored) item changes, or flows that represents values to be written into an OPC item (node).

For OPC Alarms & Events, Rx/OPC gives you the ability to work with flows representing OPC event notifications, or flows that represent OPC event conditions to be acknowledged.

In the example below, your application is required to continuously monitor the value of an OPC item, and when it is available (there is no failure obtaining the value), make some computations with it (we will multiply the value by 1000), and write the results into a different OPC item. This logic can be expressed by following code:

DAItemChangedObservable.Create<int>(
         "", "OPCLabs.KitServer.2", "Simulation.Incrementing (1 s)", 100)
     .Where(e => e.Exception == null)
     .Select(e => e.Vtq.Value * 1000)
     .Subscribe(DAWriteItemValueObserver.Create<int>(
         "", "OPCLabs.KitServer.2", "Simulation.Register_I4"));

Let’s dissect what this example does:

  1. It creates an observable sequence for significant changes in OPC-DA item "Simulation.Incrementing (1 s)".
  2. The “Where” clause filters (from the observable sequence) only the changes that have a null Exception, i.e. those that carry a valid DAVtq object (value/timestamp/quality).
  3. The “Select” clause takes the actual value from the Vtq property (it is of type DAVtq<int>), and returns the value multiplied by 1000.
  4. An observer that writes incoming vales into the “Simulation.Register_I4” OPC-DA item is created.
  5. The observer is subscribed to the transformed (processed) observable sequence.

 

As you can see, the code that does this kind of OPC data processing is very concise – all the extra “plumbing” needed in imperative programming model is gone, and only the meaningful pieces remain. Programs written in this way clearly express their intent, and the logic that handles certain functionality is concentrated in one place and not spread around various classes and methods.

Examples

// Shows how to continuously transform values of OPC-DA item, and write the results into a second item.
// Requires Microsoft Reactive Extensions (Rx).

using System;
using System.Reactive.Linq;
using System.Threading;
using OpcLabs.EasyOpc.DataAccess;
using OpcLabs.EasyOpc.DataAccess.Reactive;

namespace ReactiveDocExamples
{
    namespace _DAReactive
    {
        class Composition
        {
            public static void Pipeline()
            {
                Console.WriteLine("Creating source observable...");
                DAItemChangedObservable<int> source =
                    DAItemChangedObservable.Create<int>("", "OPCLabs.KitServer.2", "Simulation.Incrementing (1 s)", 100);

                Console.WriteLine("Creating processed observable (takes valid input values and multiplies them by 1000)...");
                IObservable<int> processed = source
                    .Where(e => e.Exception is null)
                    .Select(e => e.TypedVtq.TypedValue * 1000);

                Console.WriteLine("Creating observer to write values into OPC item...");
                DAWriteItemValueObserver<int> observer = 
                    DAWriteItemValueObserver.Create<int>("", "OPCLabs.KitServer.2", "Simulation.Register_I4");

                Console.WriteLine("Monitoring changes of the target OPC item using traditional means...");
                int handle = EasyDAClient.SharedInstance.SubscribeItem("", "OPCLabs.KitServer.2", "Simulation.Register_I4",
                    100, (_, e) => Console.WriteLine(e.Vtq));

                Console.WriteLine("Subscribing the observer to the processed observable...");
                using (processed.Subscribe(observer))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10 * 1000);

                    Console.WriteLine("Unsubscribing the observer from the processed observable...");
                }

                Console.WriteLine("Finalizing monitoring...");
                EasyDAClient.SharedInstance.UnsubscribeItem(handle);

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
// Shows how to continuously transform values of OPC-UA node, and write the results into a second node.
// Requires Microsoft Reactive Extensions (Rx).

using System;
using System.Reactive.Linq;
using System.Threading;
using OpcLabs.EasyOpc.UA;
using OpcLabs.EasyOpc.UA.Reactive;

namespace ReactiveDocExamples
{
    namespace _UAReactive
    {
        class Composition
        {
            public static void Pipeline()
            {
                // Define which server we will work with.
                UAEndpointDescriptor endpointDescriptor =
                    "opc.tcp://opcua.demo-this.com:51210/UA/SampleServer";
                // or "http://opcua.demo-this.com:51211/UA/SampleServer" (currently not supported)
                // or "https://opcua.demo-this.com:51212/UA/SampleServer/"

                Console.WriteLine("Creating source observable...");
                UADataChangeNotificationObservable<int> source =
                    UADataChangeNotificationObservable.Create<int>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=11017", 100);

                Console.WriteLine("Creating processed observable (takes valid input values and take modulo 1000)...");
                IObservable<int> processed = source
                    .Where(e => e.Exception is null)
                    .Select(e => e.TypedAttributeData.TypedValue % 1000);

                Console.WriteLine("Creating observer to write values into OPC node...");
                UAWriteValueObserver<int> observer =
                    UAWriteValueObserver.Create<int>(
                        endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10389");

                Console.WriteLine("Monitoring changes of the target OPC node using traditional means...");
                int handle = EasyUAClient.SharedInstance.SubscribeDataChange(
                    endpointDescriptor, "nsu=http://test.org/UA/Data/ ;i=10389",
                    100, (_, e) => Console.WriteLine(e.AttributeData));

                Console.WriteLine("Subscribing the observer to the processed observable...");
                using (processed.Subscribe(observer))
                {
                    Console.WriteLine("Waiting for 10 seconds...");
                    Thread.Sleep(10 * 1000);

                    Console.WriteLine("Unsubscribing the observer from the processed observable...");
                }

                Console.WriteLine("Finalizing monitoring...");
                EasyUAClient.SharedInstance.UnsubscribeMonitoredItem(handle);

                Console.WriteLine("Waiting for 2 seconds...");
                Thread.Sleep(2 * 1000);
            }
        }
    }
}
See Also

Examples - Reactive Programming